package test;

import com.gfscold.trans.common.app.BaseSQLApp;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class test02 extends BaseSQLApp {

    @Override
    public void handle(StreamTableEnvironment tableEnv, StreamExecutionEnvironment env, String groupId) {
        tableEnv.executeSql("DROP TABLE IF EXISTS out_order_detail;\n" +
                "CREATE TABLE IF NOT EXISTS out_order_detail (\n" +
                "    `id` INT NOT NULL,\n" +
                "    `product_id` INT NOT NULL COMMENT '产品',\n" +
                "    `order_id` INT NOT NULL,\n" +
                "    `unit_dictionary_name` STRING,\n" +
                "    `unit_dictionary_id` STRING COMMENT '包装',\n" +
                "    `price_unit` STRING COMMENT '计费单位',\n" +
                "    `price_quantity` DOUBLE COMMENT '计费数量',\n" +
                "    `actual_price_quantity` DOUBLE,\n" +
                "    `quantity` DOUBLE NOT NULL COMMENT '补充量(箱)',\n" +
                "    `volume` DOUBLE COMMENT '体积',\n" +
                "    `weight` DOUBLE COMMENT '重量',\n" +
                "    `actual_send_quantity` DOUBLE COMMENT '实际发货数量',\n" +
                "    `actual_receive_quantity` DOUBLE NOT NULL COMMENT '实际收货数量',\n" +
                "    `quantity_difference` DOUBLE COMMENT '数量差异',\n" +
                "    `extend_first` STRING COMMENT '预留字段1',\n" +
                "    `extend_second` STRING COMMENT '预留字段2',\n" +
                "    `last_modified_time` STRING COMMENT '更新时间',\n" +
                "    `last_modified_by` STRING COMMENT '更新人',\n" +
                "    `remark` STRING COMMENT '备注',\n" +
                "    `product_number` STRING COMMENT '产品码',\n" +
                "    `line_num` STRING,\n" +
                "    `scan_flag` BOOLEAN COMMENT '扫码标识',\n" +
                "    `out_price` DOUBLE COMMENT '出库单价',\n" +
                "    `out_warehouse_area` STRING COMMENT '出库库位',\n" +
                "    `net_weight` DOUBLE COMMENT '总净重',\n" +
                "    `original_line_num` STRING COMMENT '原始行号',\n" +
                "    `container_number` STRING COMMENT '托盘码',\n" +
                "    `date_of_shelf_life` STRING COMMENT '货架期',\n" +
                "    `date_of_sales_life` STRING COMMENT '售卖期',\n" +
                "    `temperature_range_dictionary_id` STRING COMMENT '温区Id',\n" +
                "    `temperature_range_dictionary_name` STRING COMMENT '温区名称',\n" +
                "    `product_cn_name` STRING COMMENT '产品中文名',\n" +
                "    `manual_allot_status` INT COMMENT '手动分配状态',\n" +
                "    `arc_time` TIMESTAMP COMMENT '插入在线库时间',\n" +
                "    `allot_type` INT COMMENT '分配类型 1-手动分配 0-系统分配',\n" +
                "    `tolerance_upper_limit` DOUBLE COMMENT '容差百分比上限',\n" +
                "    `tolerance_lower_limit` DOUBLE COMMENT '容差百分比下限',\n" +
                "    `op_ts` TimeStamp METADATA FROM 'op_ts' VIRTUAL,\n" +
                "    PRIMARY KEY ( `id` ) NOT ENFORCED\n" +
                ") WITH (\n" +
                "     'connector' = 'mysql-cdc'\n" +
                "    ,'hostname' = 'heukrftr7x7cmlu6no4g.rwlb.rds.aliyuncs.com'\n" +
                "    ,'port' = '3306'\n" +
                "    ,'username' = 'zhhuang4'\n" +
                "    ,'password' = 'HZfR1cqrRcZC^'\n" +
                "    ,'server-time-zone' = 'Asia/Shanghai'\n" +
                "    ,'scan.incremental.snapshot.enabled' = 'true'\n" +
                "    ,'debezium.snapshot.mode'='initial'  \n" +
                "    ,'database-name' = 'gcds'\n" +
                "    ,'table-name' = 'out_order_detail')");
        tableEnv.executeSql("DROP TABLE IF EXISTS gcds_order_detail_modified_time;\n" +
                "CREATE TABLE IF NOT EXISTS gcds_order_detail_modified_time (\n" +
                "    `id` BIGINT,\n" +
                "    `record_id` BIGINT NOT NULL COMMENT '数据ID',\n" +
                "    `created_time` TIMESTAMP COMMENT '插入时间',\n" +
                "    `modified_time` TIMESTAMP COMMENT '更新时间',\n" +
                "    `sync_state` INT NOT NULL COMMENT '状态',\n" +
                "    `version_time` TIMESTAMP,\n" +
                "    PRIMARY KEY ( `record_id` ) NOT ENFORCED\n" +
                ") WITH (\n" +
                "    'connector' = 'jdbc'\n" +
                "    ,'url'='jdbc:mysql://rm-uf6fbob412e5567i4.mysql.rds.aliyuncs.com:3306/dw_mark'\n" +
                "    ,'table-name' = 'gcds_order_detail_modified_time')");
        tableEnv.executeSql("INSERT INTO gcds_order_detail_modified_time(`record_id`, `created_time`, `modified_time`, `sync_state`, `version_time`)\n" +
                "select \n" +
                "    id                 AS record_id\n" +
                "    ,arc_time          AS created_time\n" +
                "    ,op_ts             AS modified_time\n" +
                "    ,0                 AS sync_proc_state\n" +
                "    ,op_ts             AS version_time\n" +
                "FROM out_order_detail");
    }

    public static void main(String[] args) {
        new test02().start(9999, 1, "");
    }
}
